import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';

const resourceDuration = Duration(milliseconds: 5);

class MockResource {
  var _closed = false;

  bool get isClosed => _closed;

  MockResource();

  Future<void> close() {
    if (_closed) {
      throw StateError('Resource has already been closed.');
    }
    _closed = true;
    return Future<void>.delayed(resourceDuration);
  }

  void closeSync() {
    if (_closed) {
      throw StateError('Resource has already been closed.');
    }
    _closed = true;
  }
}

enum Close {
  sync,
  async,
}

enum Create {
  sync,
  async,
}

class UsingTestPage extends TestPage {
  UsingTestPage(super.title) {
    for (final close in Close.values) {
      for (final create in Create.values) {
        String groupPrefix = 'Rx.using.${create.toString().toLowerCase()}.${close.toString().toLowerCase()}';

        group(groupPrefix, () {
          late MockResource resource;
          var isResourceCreated = false;

          late FutureOr<MockResource> Function() resourceFactory;
          late FutureOr<MockResource> Function() resourceFactoryThrows;

          late FutureOr<void> Function(MockResource) disposer;
          late FutureOr<void> Function(MockResource) disposerThrows;

          setUp() {
            isResourceCreated = false;

            resourceFactory = () {
              switch (create) {
                case Create.sync:
                  isResourceCreated = true;
                  return resource = MockResource();
                case Create.async:
                  return Future<MockResource>.delayed(
                    resourceDuration,
                        () {
                      isResourceCreated = true;
                      return resource = MockResource();
                    },
                  );
              }
            };

            resourceFactoryThrows = () {
              switch (create) {
                case Create.sync:
                  throw Exception();
                case Create.async:
                  return Future<MockResource>.delayed(
                    resourceDuration,
                        () => throw Exception(),
                  );
              }
            };

            disposer = (resource) {
              switch (close) {
                case Close.async:
                  return resource.close();
                case Close.sync:
                // ignore: unnecessary_cast
                  return resource.closeSync() as FutureOr<void>;
              }
            };

            disposerThrows = (resource) {
              switch (close) {
                case Close.async:
                  return Future<void>.delayed(
                    resourceDuration,
                        () => throw Exception(),
                  );
                case Close.sync:
                  throw Exception();
              }
            };
          }

          test('$groupPrefix.done', () async {
            setUp();
            final stream = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Stream.value(resource)
                  .flatMap((_) => Stream.fromIterable([1, 2, 3])),
              disposer,
            );

            expect(stream);

            expect(isResourceCreated);
            expect(resource.isClosed);
          });

          test('$groupPrefix.resourceFactory.throws', () async {
            setUp();
            var calledStreamFactory = false;
            var callDisposer = false;

            final stream = Rx.using<int, MockResource>(
              resourceFactoryThrows,
                  (resource) {
                calledStreamFactory = true;
                return Rx.range(0, 3);
              },
                  (resource) {
                callDisposer = true;
                return disposer(resource);
              },
            );

            expect(stream);

            expect(isResourceCreated);
            expect(calledStreamFactory);
            expect(callDisposer);
          });

          test('$groupPrefix.disposer.throws', () async {
            setUp();
            final subscription = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Rx.timer(0, resourceDuration),
              disposerThrows,
            ).listen(null);

            if (create == Create.async) {
              await Future<void>.delayed(resourceDuration * 1.2);
            }

            expect(
              subscription.cancel()
            );
          });

          test('$groupPrefix.streamFactory.throws', () async {
            setUp();
            final stream = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => throw Exception(),
              disposer,
            );

            expect(stream);

            expect(isResourceCreated);
            expect(resource.isClosed);
          });

          test('$groupPrefix.streamFactory.errors', () async {
            setUp();
            final stream = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Stream.error(Exception()),
              disposer,
            );

            expect(stream);

            expect(isResourceCreated);
            expect(resource.isClosed);
          });

          test('$groupPrefix.cancel.delayed', () async {
            setUp();
            const duration = Duration(milliseconds: 200);

            final subscription = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Rx.concat([
                Rx.timer(0, duration),
                Stream.error(Exception()),
              ]),
              disposer,
            ).listen(
              null,
              cancelOnError: false,
            );

            // ensure the stream has started
            await Future<void>.delayed(resourceDuration + duration ~/ 2);
            await subscription.cancel();
            await Future<void>.delayed(resourceDuration * 1.2);

            expect(isResourceCreated);
            expect(resource.isClosed);
          });

          test('$groupPrefix.cancel.immediately', () async {
            setUp();
            final subscription = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Rx.concat([
                Rx.timer(0, const Duration(milliseconds: 10)),
                Stream.error(Exception()),
              ]),
              disposer,
            ).listen(
              ((v) => expect(true)),
              onError: (
                    (Object e, StackTrace stackTrace) => expect(true)
              ),
              onDone: (() => expect(true, )),
            );

            await subscription.cancel();
            await Future<void>.delayed(resourceDuration * 2);

            expect(isResourceCreated);
            expect(resource.isClosed);
          });

          test('$groupPrefix.errors.continueOnError', () async {
            setUp();
            Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Rx.concat([
                Rx.timer(0, resourceDuration * 2),
                Stream<int>.error(Exception())
              ]),
              disposer,
            ).listen(
              null,
              onError: (Object e, StackTrace s) {},
              cancelOnError: false,
            );

            await Future<void>.delayed(resourceDuration * 1.2);
            expect(isResourceCreated,);
            expect(resource.isClosed);
          });

          test('$groupPrefix.errors.cancelOnError', () async {
            setUp();
            Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Stream.error(Exception()),
              disposer,
            ).listen(
              null,
              onError: (Object e, StackTrace s) {},
              cancelOnError: true,
            );

            await Future<void>.delayed(resourceDuration * 1.2);
            expect(isResourceCreated);
            expect(resource.isClosed);
          });

          test('$groupPrefix.single.subscription', () async {
            setUp();
            final stream = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Rx.range(0, 3),
              disposer,
            );
            stream.listen(null);
            expect(() => stream.listen(null));
          });

          test('$groupPrefix.asBroadcastStream', () async {
            setUp();
            final stream = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Stream.periodic(
                const Duration(milliseconds: 50),
                    (i) => i,
              ),
              disposer,
            ).asBroadcastStream(onCancel: (s) => s.cancel());

            final s1 = stream.listen(null);
            final s2 = stream.listen(null);

            // can reach here
            expect(true);

            await Future<void>.delayed(resourceDuration * 1.2);
            await s1.cancel();
            await s2.cancel();
            expect(resource.isClosed);
          });

          test('$groupPrefix.pause.resume', () async {
            setUp();
            late StreamSubscription<int> subscription;

            subscription = Rx.using<int, MockResource>(
              resourceFactory,
                  (resource) => Stream.periodic(
                const Duration(milliseconds: 20),
                    (i) => i,
              ),
              disposer,
            ).listen(
              ((value) {
                  subscription.cancel();
                  expect(value);
                }
              ),
            );

            subscription
                .pause(Future<void>.delayed(const Duration(milliseconds: 50)));
          });
        });
      }
    }
  }

}